From ac58bbd500aa31a6297112b348d02480b0d58c88 Mon Sep 17 00:00:00 2001 From: Jeroen van der Heijden Date: Wed, 3 Oct 2018 16:57:52 +0200 Subject: [PATCH] Work on buffer resizable --- ChangeLog-2.0.30 | 4 +- include/procinfo/procinfo.h | 2 +- include/siri/db/buffer.h | 31 +++- include/siri/db/db.h | 9 +- src/procinfo/procinfo.c | 22 ++- src/siri/backup.c | 6 +- src/siri/buffersync.c | 2 +- src/siri/db/buffer.c | 177 +++++++++++--------- src/siri/db/db.c | 325 +++++++++++++++++++----------------- src/siri/db/insert.c | 4 +- src/siri/db/props.c | 4 +- src/siri/db/series.c | 16 +- src/siri/db/server.c | 8 +- src/siri/db/servers.c | 4 +- 14 files changed, 342 insertions(+), 272 deletions(-) diff --git a/ChangeLog-2.0.30 b/ChangeLog-2.0.30 index 3bfa16bc..0f6b6d11 100644 --- a/ChangeLog-2.0.30 +++ b/ChangeLog-2.0.30 @@ -8,4 +8,6 @@ * Added option to fsync the buffer on a configurable interval. - * Use posix_fadvise() on the buffer file. (@Svedrin) \ No newline at end of file + * Use posix_fadvise() on the buffer file. (@Svedrin) + + * Refactor buffer and cleanup alternative buffer path. \ No newline at end of file diff --git a/include/procinfo/procinfo.h b/include/procinfo/procinfo.h index 1b905ef9..7ad5df70 100644 --- a/include/procinfo/procinfo.h +++ b/include/procinfo/procinfo.h @@ -25,7 +25,7 @@ long int procinfo_total_physical_memory(void); long int procinfo_total_virtual_memory(void); /* Total Open Files */ -long int procinfo_open_files(const char * path); +long int procinfo_open_files(const char * path, int include_fd); #endif /* PROCINFO_H_ */ diff --git a/include/siri/db/buffer.h b/include/siri/db/buffer.h index 343c7bfa..8b4ca624 100644 --- a/include/siri/db/buffer.h +++ b/include/siri/db/buffer.h @@ -12,6 +12,8 @@ #ifndef SIRIDB_BUFFER_H_ #define SIRIDB_BUFFER_H_ +typedef struct siridb_buffer_s siridb_buffer_t; + #include #include #include @@ -19,19 +21,36 @@ #define MAX_BUFFER_SZ 10485760 + +siridb_buffer_t * siridb_buffer_new(void); +void siridb_buffer_free(siridb_buffer_t * buffer); int siridb_buffer_new_series( - siridb_t * siridb, + siridb_buffer_t * buffer, siridb_series_t * series); -int siridb_buffer_open(siridb_t * siridb); +int siridb_buffer_open(siridb_buffer_t * buffer); int siridb_buffer_load(siridb_t * siridb); -void siridb_buffer_free(siridb_t * siridb); int siridb_buffer_write_empty( - siridb_t * siridb, + siridb_buffer_t * buffer, siridb_series_t * series); int siridb_buffer_write_last_point( - siridb_t * siridb, + siridb_buffer_t * buffer, siridb_series_t * series); -int siridb_buffer_fsync(siridb_t * siridb); +struct siridb_buffer_s +{ + size_t size; /* size for one series inside the buffer */ + size_t nsize; /* optional new size from database.conf */ + size_t len; /* number of points allocated per series */ + char * template; /* template for writing an empty buffer */ + char * path; /* path where the buffer file is stored */ + slist_t * empty; /* list with empty buffer spaces */ + FILE * fp; /* buffer file pointer */ + int fd; /* buffer file descriptor */ +}; + +static inline int siridb_buffer_fsync(siridb_buffer_t * buffer) +{ + return (buffer->fp == NULL) ? 0 : fsync(buffer->fd); +} #endif /* SIRIDB_BUFFER_H_ */ diff --git a/include/siri/db/db.h b/include/siri/db/db.h index ec7bae6e..a9469e7e 100644 --- a/include/siri/db/db.h +++ b/include/siri/db/db.h @@ -42,6 +42,7 @@ typedef struct siridb_s siridb_t; #include #include #include +#include int32_t siridb_get_uptime(siridb_t * siridb); int8_t siridb_get_idle_percentage(siridb_t * siridb); @@ -71,19 +72,15 @@ struct siridb_s uint32_t list_limit; uuid_t uuid; iso8601_tz_t tz; - size_t buffer_size; - size_t buffer_len; - char * buffer_clear; struct timespec start_time; /* to calculate up-time. */ uint64_t duration_num; /* number duration in s, ms, us or ns */ uint64_t duration_log; /* log duration in s, ms, us or ns */ char * dbname; char * dbpath; - char * buffer_path; double drop_threshold; size_t received_points; size_t selected_points; - slist_t * empty_buffers; + siridb_time_t * time; siridb_server_t * server; siridb_server_t * replica; @@ -95,13 +92,13 @@ struct siridb_s uv_mutex_t series_mutex; uv_mutex_t shards_mutex; imap_t * shards; - FILE * buffer_fp; FILE * dropped_fp; qp_fpacker_t * store; siridb_fifo_t * fifo; siridb_replicate_t * replicate; siridb_reindex_t * reindex; siridb_groups_t * groups; + siridb_buffer_t * buffer; siridb_tasks_t tasks; }; diff --git a/src/procinfo/procinfo.c b/src/procinfo/procinfo.c index 4598332c..ea725b02 100644 --- a/src/procinfo/procinfo.c +++ b/src/procinfo/procinfo.c @@ -109,7 +109,7 @@ long int procinfo_total_physical_memory(void) #endif #ifdef __APPLE__ -long int procinfo_open_files(const char * path) +long int procinfo_open_files(const char * path, int include_fd) { pid_t pid = getpid(); size_t len = strlen(path); @@ -147,15 +147,25 @@ long int procinfo_open_files(const char * path) if ( res == PROC_PIDFDVNODEPATHINFO_SIZE && strncmp(path, vnode_info.pvip.vip_path, len) == 0) { + vnode_info count++; } + else if ( + res == PROC_PIDFDVNODEPATHINFO_SIZE && + include_fd >= 0 && + include_fd == fd_info[i].proc_fd) + { + include_fd = -1; + count++; + }; + } } free(fd_info); return count; } #else -long int procinfo_open_files(const char * path) +long int procinfo_open_files(const char * path, int include_fd) { long int count = 0; DIR * dirp; @@ -174,7 +184,6 @@ long int procinfo_open_files(const char * path) if (entry->d_type == DT_REG || entry->d_type == DT_LNK) { snprintf(buffer, XPATH_MAX, "/proc/self/fd/%s", entry->d_name); - if (realpath(buffer, buf) == NULL) { continue; @@ -184,6 +193,13 @@ long int procinfo_open_files(const char * path) { count++; } + else if ( + include_fd >= 0 && + include_fd == strtol(entry->d_name, NULL, 10)) + { + include_fd = -1; + count++; + }; } } closedir(dirp); diff --git a/src/siri/backup.c b/src/siri/backup.c index 628f33e1..7a843372 100644 --- a/src/siri/backup.c +++ b/src/siri/backup.c @@ -173,11 +173,11 @@ static void BACKUP_walk(siridb_t * siridb, void * args __attribute__((unused))) siridb_fifo_close(siridb->fifo); } - if (siridb->buffer_fp != NULL) + if (siridb->buffer->fp != NULL) { - if (fclose(siridb->buffer_fp) == 0) + if (fclose(siridb->buffer->fp) == 0) { - siridb->buffer_fp = NULL; + siridb->buffer->fp = NULL; } else { diff --git a/src/siri/buffersync.c b/src/siri/buffersync.c index e6723dbe..07d5fa0d 100644 --- a/src/siri/buffersync.c +++ b/src/siri/buffersync.c @@ -60,7 +60,7 @@ static void BUFFERSYNC_cb(uv_timer_t * handle __attribute__((unused))) siridb = (siridb_t *) siridb_node->data; /* flush the buffer, maybe on each insert or another interval? */ - if (siridb_buffer_fsync(siridb)) + if (siridb_buffer_fsync(siridb->buffer)) { log_critical("fsync() has failed on the buffer file"); } diff --git a/src/siri/db/buffer.c b/src/siri/db/buffer.c index 88d923a6..3e5d6b1f 100644 --- a/src/siri/db/buffer.c +++ b/src/siri/db/buffer.c @@ -26,8 +26,12 @@ /* when set to 1, no caching is done. 1 is the minimum value. */ #define SIRIDB_BUFFER_CACHE 64 -static int buffer__create_new(siridb_t * siridb, siridb_series_t * series); -static int buffer__use_empty(siridb_t * siridb, siridb_series_t * series); +static int buffer__create_new( + siridb_buffer_t * buffer, + siridb_series_t * series); +static int buffer__use_empty( + siridb_buffer_t * buffer, + siridb_series_t * series); static void buffer__migrate_to_new(char * pt, size_t sz); /* buffer__start cannot conflict with a series_id since id 0 is never used */ @@ -35,25 +39,61 @@ static const uint32_t buffer__start = 0x00000000; static const uint64_t buffer__end = 0xffffffffffffffff; +siridb_buffer_t * siridb_buffer_new(void) +{ + siridb_buffer_t * buffer = malloc(sizeof(siridb_buffer_t)); + if (buffer == NULL) + { + return NULL; + } + buffer->empty = slist_new(SLIST_DEFAULT_SIZE); + if (buffer->empty == NULL) + { + free(buffer); + return NULL; + } + buffer->fd = 0; + buffer->fp = NULL; + buffer->len = 0; + buffer->nsize = 0; + buffer->path = NULL; + buffer->size = 0; + buffer->template = NULL; + + return buffer; +} + +void siridb_buffer_free(siridb_buffer_t * buffer) +{ + if (buffer->fp != NULL) + { + fclose(buffer->fp); + } + free(buffer->template); + free(buffer->path); + slist_free(buffer->empty); + free(buffer); +} + /* * Returns 0 if success or EOF in case of an error. */ int siridb_buffer_write_empty( - siridb_t * siridb, + siridb_buffer_t * buffer, siridb_series_t * series) { - memcpy(siridb->buffer_clear + 4, &series->id, sizeof(uint32_t)); + memcpy(buffer->template + 4, &series->id, sizeof(uint32_t)); return ( /* go to the series position in buffer */ - fseeko( siridb->buffer_fp, + fseeko( buffer->fp, series->bf_offset, SEEK_SET) || /* write end ts */ - fwrite( siridb->buffer_clear, - siridb->buffer_size, + fwrite( buffer->template, + buffer->size, 1, - siridb->buffer_fp) != 1) ? EOF : 0; + buffer->fp) != 1) ? EOF : 0; } /* @@ -63,7 +103,7 @@ int siridb_buffer_write_empty( * Returns 0 if success or EOF in case of an error. */ int siridb_buffer_write_last_point( - siridb_t * siridb, + siridb_buffer_t * buffer, siridb_series_t * series) { siridb_point_t * point; @@ -79,67 +119,62 @@ int siridb_buffer_write_last_point( return ( /* jump to position where to write the new point */ - fseeko( siridb->buffer_fp, + fseeko( buffer->fp, series->bf_offset + 8 + (16 * last_idx), SEEK_SET) || /* write time-stamp and value */ - fwrite(buf, sz, 1, siridb->buffer_fp) != 1) ? EOF : 0; + fwrite(buf, sz, 1, buffer->fp) != 1) ? EOF : 0; } /* * Returns 0 if successful; -1 and a SIGNAL is raised in case an error occurred. */ -int siridb_buffer_new_series(siridb_t * siridb, siridb_series_t * series) +int siridb_buffer_new_series( + siridb_buffer_t * buffer, + siridb_series_t * series) { /* allocate new buffer */ - series->buffer = siridb_points_new(siridb->buffer_len, series->tp); + series->buffer = siridb_points_new(buffer->len, series->tp); if (series->buffer == NULL) { return -1; /* signal is raised */ } - return (siridb->empty_buffers->len) ? - buffer__use_empty(siridb, series) : - buffer__create_new(siridb, series); -} - -int siridb_buffer_fsync(siridb_t * siridb) -{ - if (siridb->buffer_fp == NULL) - { - return 0; - } - int buffer_fd = fileno(siridb->buffer_fp); - return (buffer_fd != -1) ? fsync(buffer_fd) : -1; + return (buffer->empty->len) ? + buffer__use_empty(buffer, series) : + buffer__create_new(buffer, series); } /* * Returns 0 if successful or -1 in case of an error. */ -int siridb_buffer_open(siridb_t * siridb) +int siridb_buffer_open(siridb_buffer_t * buffer) { - int buffer_fd, rc; - siridb_misc_get_fn(fn, siridb->buffer_path, SIRIDB_BUFFER_FN) + const int flags = POSIX_FADV_RANDOM | POSIX_FADV_DONTNEED; + int rc; + siridb_misc_get_fn(fn, buffer->path, SIRIDB_BUFFER_FN) - if ((siridb->buffer_fp = fopen(fn, "r+")) == NULL) + if ((buffer->fp = fopen(fn, "r+")) == NULL) { log_critical("Cannot open '%s' for reading and writing", fn); return -1; } - buffer_fd = fileno(siridb->buffer_fp); + buffer->fd = fileno(buffer->fp); - if (buffer_fd == -1) + if (buffer->fd == -1) { log_critical("Cannot get file descriptor: '%s'", fn); + fclose(buffer->fp); + buffer->fp = NULL; return -1; } #ifdef __APPLE__ rc = 0; /* no posix_fadvise on apple */ #else - rc = posix_fadvise(buffer_fd, 0, 0, POSIX_FADV_RANDOM|POSIX_FADV_DONTNEED); + rc = posix_fadvise(buffer->fd, 0, 0, flags); if (rc) { log_warning("Cannot set advice for file access: '%s' (%d)", fn, rc); @@ -177,11 +212,12 @@ static void buffer__migrate_to_new(char * pt, size_t sz) */ int siridb_buffer_load(siridb_t * siridb) { + siridb_buffer_t * buffer = siridb->buffer; FILE * fp; FILE * fp_temp; size_t read_at_once = 8; size_t num, i; - char buffer[siridb->buffer_size * read_at_once]; + char buf[buffer->size * read_at_once]; char * pt, * end; long int offset = 0; siridb_series_t * series; @@ -191,25 +227,25 @@ int siridb_buffer_load(siridb_t * siridb) log_info("Loading and cleanup buffer"); - siridb->buffer_clear = malloc(siridb->buffer_size); - if (siridb->buffer_clear == NULL) + buffer->template = malloc(buffer->size); + if (buffer->template == NULL) { log_critical("Allocation error while loading buffer"); return -1; } - for ( pt = siridb->buffer_clear, - end = siridb->buffer_clear + siridb->buffer_size; + for ( pt = buffer->template, + end = buffer->template + buffer->size; pt < end; pt += sizeof(uint64_t)) { memcpy(pt, &buffer__end, sizeof(uint64_t)); } - memcpy(siridb->buffer_clear, &buffer__start, sizeof(uint32_t)); + memcpy(buffer->template, &buffer__start, sizeof(uint32_t)); - siridb_misc_get_fn(fn, siridb->buffer_path, SIRIDB_BUFFER_FN) - siridb_misc_get_fn(fn_temp, siridb->buffer_path, "__" SIRIDB_BUFFER_FN) + siridb_misc_get_fn(fn, buffer->path, SIRIDB_BUFFER_FN) + siridb_misc_get_fn(fn_temp, buffer->path, "__" SIRIDB_BUFFER_FN) if (xpath_file_exist(fn_temp)) { @@ -243,11 +279,11 @@ int siridb_buffer_load(siridb_t * siridb) return -1; } - while ((num = fread(buffer, siridb->buffer_size, read_at_once, fp))) + while ((num = fread(buf, buffer->size, read_at_once, fp))) { for (i = 0; i < num; i++) { - pt = buffer + i * siridb->buffer_size; + pt = buf + i * buffer->size; buf_start = *((uint32_t *) pt); if (buf_start != buffer__start) @@ -257,7 +293,7 @@ int siridb_buffer_load(siridb_t * siridb) log_warning("Buffer will be migrated"); log_migrate = 0; } - buffer__migrate_to_new(pt, siridb->buffer_size); + buffer__migrate_to_new(pt, buffer->size); } pt += sizeof(uint32_t); @@ -271,7 +307,7 @@ int siridb_buffer_load(siridb_t * siridb) continue; } - series->buffer = siridb_points_new(siridb->buffer_len, series->tp); + series->buffer = siridb_points_new(buffer->len, series->tp); if (series->buffer == NULL) { log_critical("Cannot allocate a buffer for series id %u", @@ -290,14 +326,13 @@ int siridb_buffer_load(siridb_t * siridb) siridb_points_add_point(series->buffer, ts, val); } - offset += siridb->buffer_size; + offset += buffer->size; /* increment series->length which is 0 at this time */ series->length += series->buffer->len; /* write to output file and check if write was successful */ - if ((fwrite(buffer + i * siridb->buffer_size, - siridb->buffer_size, 1, fp_temp) != 1)) + if ((fwrite(buf + i*buffer->size, buffer->size, 1, fp_temp) != 1)) { log_critical("Could not write to temporary buffer file: '%s'", fn_temp); @@ -320,17 +355,6 @@ int siridb_buffer_load(siridb_t * siridb) return 0; } -void siridb_buffer_free(siridb_t * siridb) -{ - if (siridb->buffer_fp != NULL) - { - fclose(siridb->buffer_fp); - siridb->buffer_fp = NULL; - } - free(siridb->buffer_clear); - siridb->buffer_clear = NULL; -} - /* * Reserve a space in the buffer for a new series. The position of this space * in the buffer is read from siridb->empty_buffers so this list must have @@ -341,11 +365,13 @@ void siridb_buffer_free(siridb_t * siridb) * Note that an available spot must be checked before calling this function. * This functions has undefined behavior if no spot is found. */ -static int buffer__use_empty(siridb_t * siridb, siridb_series_t * series) +static int buffer__use_empty( + siridb_buffer_t * buffer, + siridb_series_t * series) { - series->bf_offset = (long int) slist_pop(siridb->empty_buffers); + series->bf_offset = (long int) slist_pop(buffer->empty); - if (siridb_buffer_write_empty(siridb, series)) + if (siridb_buffer_write_empty(buffer, series)) { ERR_FILE return -1; @@ -361,58 +387,53 @@ static int buffer__use_empty(siridb_t * siridb, siridb_series_t * series) * * Returns 0 if successful or -1 and a signal is raised in case of an error. */ -static int buffer__create_new(siridb_t * siridb, siridb_series_t * series) +static int buffer__create_new( + siridb_buffer_t * buffer, + siridb_series_t * series) { long int buffer_pos; - /* get file descriptor */ - int buffer_fd = fileno(siridb->buffer_fp); - if (buffer_fd == -1) - { - ERR_FILE - return -1; - } /* jump to end of buffer */ - if (fseeko(siridb->buffer_fp, 0, SEEK_END)) + if (fseeko(buffer->fp, 0, SEEK_END)) { ERR_FILE return -1; } /* bind the current offset to the new series */ - if ((series->bf_offset = ftello(siridb->buffer_fp)) == -1) + if ((series->bf_offset = ftello(buffer->fp)) == -1) { ERR_FILE return -1; } /* write buffer start and series ID to buffer */ - if (siridb_buffer_write_empty(siridb, series)) + if (siridb_buffer_write_empty(buffer, series)) { ERR_FILE return -1; } - buffer_pos = series->bf_offset + siridb->buffer_size * SIRIDB_BUFFER_CACHE; + buffer_pos = series->bf_offset + buffer->size * SIRIDB_BUFFER_CACHE; /* fill buffer with zeros if possible */ - if (ftruncate(buffer_fd, buffer_pos)) + if (ftruncate(buffer->fd, buffer_pos)) { ERR_FILE return -1; } /* commit changes to disk */ - if (fsync(buffer_fd)) + if (fsync(buffer->fd)) { ERR_FILE return -1; } - while ((buffer_pos -= siridb->buffer_size) > series->bf_offset) + while ((buffer_pos -= buffer->size) > series->bf_offset) { - slist_append_safe(&siridb->empty_buffers, (void *) buffer_pos); + slist_append_safe(&buffer->empty, (void *) buffer_pos); } return 0; diff --git a/src/siri/db/db.c b/src/siri/db/db.c index 7946d077..3e94fe8d 100644 --- a/src/siri/db/db.c +++ b/src/siri/db/db.c @@ -47,13 +47,15 @@ * */ -static siridb_t * SIRIDB_new(void); - -static int SIRIDB_from_unpacker( +static siridb_t * siridb__new(void); +static int siridb__from_unpacker( qp_unpacker_t * unpacker, siridb_t ** siridb, const char * dbpath, char * err_msg); +static siridb_t * siridb__from_dat(const char * dbpath); +static int siridb__read_conf(siridb_t * siridb); +static int siridb__lock(const char * dbpath, int lock_flags); #define READ_DB_EXIT_WITH_ERROR(ERROR_MSG) \ strcpy(err_msg, ERROR_MSG); \ @@ -85,7 +87,6 @@ int8_t siridb_get_idle_percentage(siridb_t * siridb) return (idle > 100) ? 100 : idle; } - /* * Check if at least database.conf and database.dat exist in the path. */ @@ -120,14 +121,7 @@ int siridb_is_db_path(const char * dbpath) siridb_t * siridb_new(const char * dbpath, int lock_flags) { size_t len = strlen(dbpath); - lock_t lock_rc; - char buffer[XPATH_MAX]; - cfgparser_t * cfgparser; - cfgparser_option_t * option = NULL; - qp_unpacker_t * unpacker; siridb_t * siridb; - char err_msg[512]; - int rc; size_t i; if (!len || dbpath[len - 1] != '/') @@ -143,118 +137,25 @@ siridb_t * siridb_new(const char * dbpath, int lock_flags) return NULL; } - lock_rc = lock_lock(dbpath, lock_flags); - - switch (lock_rc) - { - case LOCK_IS_LOCKED_ERR: - case LOCK_PROCESS_NAME_ERR: - case LOCK_WRITE_ERR: - case LOCK_READ_ERR: - case LOCK_MEM_ALLOC_ERR: - log_error("%s (%s)", lock_str(lock_rc), dbpath); - return NULL; - case LOCK_NEW: - log_info("%s (%s)", lock_str(lock_rc), dbpath); - break; - case LOCK_OVERWRITE: - log_warning("%s (%s)", lock_str(lock_rc), dbpath); - break; - default: - assert (0); - break; - } - - /* read database.conf */ - snprintf(buffer, - XPATH_MAX, - "%sdatabase.conf", - dbpath); - - cfgparser = cfgparser_new(); - if (cfgparser == NULL) - { - return NULL; /* signal is raised */ - } - if ((rc = cfgparser_read(cfgparser, buffer)) != CFGPARSER_SUCCESS) - { - log_error("Could not read '%s': %s", - buffer, - cfgparser_errmsg(rc)); - cfgparser_free(cfgparser); - return NULL; - } - - snprintf(buffer, - XPATH_MAX, - "%sdatabase.dat", - dbpath); - - if ((unpacker = qp_unpacker_ff(buffer)) == NULL) - { - /* qp_unpacker has done some logging */ - cfgparser_free(cfgparser); - return NULL; - } - - if ((rc = SIRIDB_from_unpacker( - unpacker, - &siridb, - dbpath, - err_msg)) < 0) + if (siridb__lock(dbpath, lock_flags)) { - log_error("Could not read '%s': %s", buffer, err_msg); - qp_unpacker_ff_free(unpacker); - cfgparser_free(cfgparser); + log_error("Cannot lock database path '%s'", dbpath); return NULL; } - qp_unpacker_ff_free(unpacker); - - if (rc > 0 && siridb_save(siridb)) + siridb = siridb__from_dat(dbpath); + if (siridb == NULL) { - log_error("Could not write file: %s", buffer); - cfgparser_free(cfgparser); - siridb_decref(siridb); + log_error("Cannot load SiriDB from database path '%s'", dbpath); return NULL; } log_info("Start loading database: '%s'", siridb->dbname); - /* read buffer_path from database.conf */ - rc = cfgparser_get_option( - &option, - cfgparser, - "buffer", - "path"); - - if (rc == CFGPARSER_SUCCESS && option->tp == CFGPARSER_TP_STRING) - { - len = strlen(option->val->string); - siridb->buffer_path = NULL; - if (option->val->string[len - 1] == '/') - { - siridb->buffer_path = strdup(option->val->string); - } - else if (asprintf( - &siridb->buffer_path, - "%s/", - option->val->string) < 0) - { - siridb->buffer_path = NULL; - } - } - else - { - siridb->buffer_path = siridb->dbpath; - } - - /* free cfgparser */ - cfgparser_free(cfgparser); - - if (siridb->buffer_path == NULL) + /* read database.conf */ + if (siridb__read_conf(siridb)) { - ERR_ALLOC + log_error("Could not read config for database '%s'", siridb->dbname); siridb_decref(siridb); return NULL; } @@ -283,26 +184,26 @@ siridb_t * siridb_new(const char * dbpath, int lock_flags) return NULL; } - /* load buffer */ - if (siridb_buffer_load(siridb)) + /* load shards */ + if (siridb_shards_load(siridb)) { - log_error("Could not read buffer for database '%s'", siridb->dbname); + log_error("Could not read shards for database '%s'", siridb->dbname); siridb_decref(siridb); return NULL; } - /* open buffer */ - if (siridb_buffer_open(siridb)) + /* load buffer */ + if (siridb_buffer_load(siridb)) { - log_error("Could not open buffer for database '%s'", siridb->dbname); + log_error("Could not read buffer for database '%s'", siridb->dbname); siridb_decref(siridb); return NULL; } - /* load shards */ - if (siridb_shards_load(siridb)) + /* open buffer */ + if (siridb_buffer_open(siridb->buffer)) { - log_error("Could not read shards for database '%s'", siridb->dbname); + log_error("Could not open buffer for database '%s'", siridb->dbname); siridb_decref(siridb); return NULL; } @@ -381,7 +282,7 @@ siridb_t * siridb_new(const char * dbpath, int lock_flags) * * (a SIGNAL can be set in case of an error) */ -static int SIRIDB_from_unpacker( +static int siridb__from_unpacker( qp_unpacker_t * unpacker, siridb_t ** siridb, const char * dbpath, @@ -417,7 +318,7 @@ static int SIRIDB_from_unpacker( } /* create a new SiriDB structure */ - *siridb = SIRIDB_new(); + *siridb = siridb__new(); if (*siridb == NULL) { sprintf(err_msg, "Cannot create SiriDB instance."); @@ -478,8 +379,8 @@ static int SIRIDB_from_unpacker( } /* bind buffer size and len to SiriDB */ - (*siridb)->buffer_size = (size_t) qp_obj.via.int64; - (*siridb)->buffer_len = (*siridb)->buffer_size / sizeof(siridb_point_t); + (*siridb)->buffer->size = (size_t) qp_obj.via.int64; + (*siridb)->buffer->len = (*siridb)->buffer->size / sizeof(siridb_point_t); /* read number duration */ if (qp_next(unpacker, &qp_obj) != QP_INT64) @@ -618,18 +519,10 @@ ssize_t siridb_get_file(char ** buffer, siridb_t * siridb) */ int siridb_open_files(siridb_t * siridb) { - int open_files = procinfo_open_files(siridb->dbpath); - - if ( siridb->buffer_path != siridb->dbpath && - strncmp( - siridb->dbpath, - siridb->buffer_path, - strlen(siridb->dbpath))) - { - open_files += procinfo_open_files(siridb->buffer_path); - } - - return open_files; + siridb_buffer_t * buffer = siridb->buffer; + return procinfo_open_files( + siridb->dbpath, + (buffer->fp == NULL) ? -1 : buffer->fd); } /* @@ -655,7 +548,7 @@ int siridb_save(siridb_t * siridb) qp_fadd_raw(fpacker, (const unsigned char *) siridb->uuid, 16) || qp_fadd_string(fpacker, siridb->dbname) || qp_fadd_int8(fpacker, siridb->time->precision) || - qp_fadd_int64(fpacker, siridb->buffer_size) || + qp_fadd_int64(fpacker, siridb->buffer->size) || qp_fadd_int64(fpacker, siridb->duration_num) || qp_fadd_int64(fpacker, siridb->duration_log) || qp_fadd_string(fpacker, iso8601_tzname(siridb->tz)) || @@ -678,7 +571,10 @@ void siridb__free(siridb_t * siridb) #endif /* first we should close the buffer and all other open files */ - siridb_buffer_free(siridb); + if (siridb->buffer != NULL) + { + siridb_buffer_free(siridb->buffer); + } if (siridb->dropped_fp != NULL) { @@ -696,9 +592,6 @@ void siridb__free(siridb_t * siridb) siridb_users_free(siridb->users); } - /* free buffer positions */ - slist_free(siridb->empty_buffers); - /* we do not need to free server and replica since they exist in * this list and therefore will be freed. */ @@ -753,12 +646,6 @@ void siridb__free(siridb_t * siridb) imap_free(siridb->shards, (imap_free_cb) &siridb__shard_decref); } - /* only free buffer path when not equal to db_path */ - if (siridb->buffer_path != siridb->dbpath) - { - free(siridb->buffer_path); - } - if (siridb->groups != NULL) { siridb_groups_decref(siridb->groups); @@ -787,7 +674,7 @@ void siridb__free(siridb_t * siridb) /* * Returns NULL and raises a SIGNAL in case an error has occurred. */ -static siridb_t * SIRIDB_new(void) +static siridb_t * siridb__new(void) { siridb_t * siridb = (siridb_t *) malloc(sizeof(siridb_t)); if (siridb == NULL) @@ -827,9 +714,9 @@ static siridb_t * SIRIDB_new(void) } else { - /* allocate a list for buffer positions */ - siridb->empty_buffers = slist_new(SLIST_DEFAULT_SIZE); - if (siridb->empty_buffers == NULL) + /* allocate a buffer */ + siridb->buffer = siridb_buffer_new(); + if (siridb->buffer == NULL) { imap_free(siridb->shards, NULL); imap_free(siridb->series_map, NULL); @@ -845,7 +732,6 @@ static siridb_t * SIRIDB_new(void) siridb->ref = 1; siridb->insert_tasks = 0; siridb->flags = 0; - siridb->buffer_path = NULL; siridb->time = NULL; siridb->users = NULL; siridb->servers = NULL; @@ -856,7 +742,6 @@ static siridb_t * SIRIDB_new(void) siridb->drop_threshold = DEF_DROP_THRESHOLD; siridb->select_points_limit = DEF_SELECT_POINTS_LIMIT; siridb->list_limit = DEF_LIST_LIMIT; - siridb->buffer_size = -1; siridb->tz = -1; siridb->server = NULL; siridb->replica = NULL; @@ -866,7 +751,6 @@ static siridb_t * SIRIDB_new(void) siridb->groups = NULL; /* make file pointers are NULL when file is closed */ - siridb->buffer_fp = NULL; siridb->dropped_fp = NULL; siridb->store = NULL; @@ -880,6 +764,137 @@ static siridb_t * SIRIDB_new(void) return siridb; } +static siridb_t * siridb__from_dat(const char * dbpath) +{ + int rc; + siridb_t * siridb = NULL; + char err_msg[512]; + qp_unpacker_t * unpacker; + char buffer[XPATH_MAX]; + + snprintf(buffer, + XPATH_MAX, + "%sdatabase.dat", + dbpath); + + unpacker = qp_unpacker_ff(buffer); + if (unpacker == NULL) + { + return NULL; + } + + if ((rc = siridb__from_unpacker( + unpacker, + &siridb, + dbpath, + err_msg)) < 0) + { + log_error("Could not read '%s': %s", buffer, err_msg); + qp_unpacker_ff_free(unpacker); + return NULL; + } + + qp_unpacker_ff_free(unpacker); + + if (rc > 0 && siridb_save(siridb)) + { + log_error("Could not write file: %s", buffer); + siridb_decref(siridb); + return NULL; + } + + return siridb; +} + +static int siridb__read_conf(siridb_t * siridb) +{ + int rc; + char buf[XPATH_MAX]; + cfgparser_t * cfgparser; + cfgparser_option_t * option = NULL; + siridb_buffer_t * buffer = siridb->buffer; + snprintf(buf, + XPATH_MAX, + "%sdatabase.conf", + siridb->dbpath); + + cfgparser = cfgparser_new(); + if (cfgparser == NULL) + { + return -1; /* signal is raised */ + } + + rc = cfgparser_read(cfgparser, buf); + + if (rc != CFGPARSER_SUCCESS) + { + log_error("Could not read '%s': %s", buf, cfgparser_errmsg(rc)); + cfgparser_free(cfgparser); + return -1; + } + + /* read buffer_path from database.conf */ + rc = cfgparser_get_option(&option, cfgparser, "buffer", "path"); + if (rc == CFGPARSER_SUCCESS && option->tp == CFGPARSER_TP_STRING) + { + size_t len = strlen(option->val->string); + buffer->path = NULL; + if (option->val->string[len - 1] == '/') + { + buffer->path = strdup(option->val->string); + } + else if ( + len >= 11 && + strcmp(option->val->string + (len-11), "/buffer.dat") == 0) + { + buffer->path = strndup(option->val->string, len-10); + } + else if (asprintf(&buffer->path, "%s/", option->val->string) < 0) + { + buffer->path = NULL; + } + } + else + { + buffer->path = strdup(siridb->dbpath); + } + + rc = cfgparser_get_option(&option, cfgparser, "buffer", "size"); + if (rc == CFGPARSER_SUCCESS && option->tp == CFGPARSER_TP_INTEGER) + { + + } + + cfgparser_free(cfgparser); + + return (buffer->path == NULL) ? -1 : 0; +} + +static int siridb__lock(const char * dbpath, int lock_flags) +{ + lock_t lock_rc = lock_lock(dbpath, lock_flags); + + switch (lock_rc) + { + case LOCK_IS_LOCKED_ERR: + case LOCK_PROCESS_NAME_ERR: + case LOCK_WRITE_ERR: + case LOCK_READ_ERR: + case LOCK_MEM_ALLOC_ERR: + log_error("%s (%s)", lock_str(lock_rc), dbpath); + return -1; + case LOCK_NEW: + log_info("%s (%s)", lock_str(lock_rc), dbpath); + break; + case LOCK_OVERWRITE: + log_warning("%s (%s)", lock_str(lock_rc), dbpath); + break; + default: + assert (0); + break; + } + return 0; +} diff --git a/src/siri/db/insert.c b/src/siri/db/insert.c index b8ef59e7..6ca4b23f 100644 --- a/src/siri/db/insert.c +++ b/src/siri/db/insert.c @@ -904,7 +904,7 @@ static void INSERT_local_task(uv_async_t * handle) siridb = ilocal->siridb; - if (siridb->buffer_fp == NULL && siridb_buffer_open(siridb)) + if (siridb->buffer->fp == NULL && siridb_buffer_open(siridb->buffer)) { ERR_FILE ilocal->status = INSERT_LOCAL_ERROR; @@ -956,7 +956,7 @@ static void INSERT_local_task(uv_async_t * handle) if (siri.buffersync == NULL) { - if (siridb_buffer_fsync(siridb)) + if (siridb_buffer_fsync(siridb->buffer)) { log_critical("fsync() has failed on the buffer file"); } diff --git a/src/siri/db/props.c b/src/siri/db/props.c index 33977059..dcfbe980 100644 --- a/src/siri/db/props.c +++ b/src/siri/db/props.c @@ -277,7 +277,7 @@ static void prop_buffer_path( int map) { SIRIDB_PROP_MAP("buffer_path", 11) - qp_add_string(packer, siridb->buffer_path); + qp_add_string(packer, siridb->buffer->path); } static void prop_buffer_size( @@ -286,7 +286,7 @@ static void prop_buffer_size( int map) { SIRIDB_PROP_MAP("buffer_size", 11) - qp_add_int32(packer, (int32_t) siridb->buffer_size); + qp_add_int32(packer, (int32_t) siridb->buffer->size); } static void prop_dbname( diff --git a/src/siri/db/series.c b/src/siri/db/series.c index 2d8b3627..2e3d9db3 100644 --- a/src/siri/db/series.c +++ b/src/siri/db/series.c @@ -133,7 +133,7 @@ int siridb_series_add_point( */ siridb_points_add_point(series->buffer, ts, val); - if (series->buffer->len == siridb->buffer_len) + if (series->buffer->len == siridb->buffer->len) { if (siridb_shards_add_points( siridb, @@ -145,7 +145,7 @@ int siridb_series_add_point( else { series->buffer->len = 0; - if (siridb_buffer_write_empty(siridb, series)) + if (siridb_buffer_write_empty(siridb->buffer, series)) { ERR_FILE rc = -1; @@ -154,7 +154,7 @@ int siridb_series_add_point( } else { - if (siridb_buffer_write_last_point(siridb, series)) + if (siridb_buffer_write_last_point(siridb->buffer, series)) { ERR_FILE log_critical("Cannot write new point to buffer"); @@ -181,7 +181,7 @@ int siridb_series_add_pcache( siridb_series_t *__restrict series, siridb_pcache_t *__restrict pcache) { - if (pcache->len > siridb->buffer_len || series->buffer == NULL) + if (pcache->len > siridb->buffer->len || series->buffer == NULL) { series->length += pcache->len; @@ -191,7 +191,7 @@ int siridb_series_add_pcache( (siridb_points_t *) pcache); } - if (pcache->len + series->buffer->len > siridb->buffer_len) + if (pcache->len + series->buffer->len > siridb->buffer->len) { series->length += pcache->len; @@ -217,7 +217,7 @@ int siridb_series_add_pcache( } series->buffer->len = 0; - if (siridb_buffer_write_empty(siridb, series)) + if (siridb_buffer_write_empty(siridb->buffer, series)) { ERR_FILE return -1; @@ -287,7 +287,7 @@ siridb_series_t * siridb_series_new( } /* create a buffer for series (except string series) */ - if (tp != TP_STRING && siridb_buffer_new_series(siridb, series)) + if (tp != TP_STRING && siridb_buffer_new_series(siridb->buffer, series)) { /* signal is raised */ log_critical("Could not create buffer for series '%s'.", @@ -354,7 +354,7 @@ void siridb__series_free(siridb_series_t *__restrict series) if (series->flags & SIRIDB_SERIES_IS_DROPPED) { slist_append_safe( - &series->siridb->empty_buffers, + &series->siridb->buffer->empty, (void *) series->bf_offset); } } diff --git a/src/siri/db/server.c b/src/siri/db/server.c index 6b994e4d..778c150b 100644 --- a/src/siri/db/server.c +++ b/src/siri/db/server.c @@ -711,8 +711,8 @@ static void SERVER_on_connect(uv_connect_t * req, int status) qp_add_int8(packer, siri.cfg->ip_support) || qp_add_string_term(packer, uv_version_string()) || qp_add_string_term(packer, siridb->dbpath) || - qp_add_string_term(packer, siridb->buffer_path) || - qp_add_int64(packer, (int64_t) siridb->buffer_size) || + qp_add_string_term(packer, siridb->buffer->path) || + qp_add_int64(packer, (int64_t) siridb->buffer->size) || qp_add_int32(packer, (int32_t) siri.startup_time) || qp_add_string_term(packer, siridb->server->address) || qp_add_int32(packer, (int32_t) siridb->server->port)) @@ -1013,7 +1013,7 @@ int siridb_server_cexpr_cb( return cexpr_str_cmp( cond->operator, (wserver->siridb->server == wserver->server) ? - wserver->siridb->buffer_path : + wserver->siridb->buffer->path : (wserver->server->buffer_path != NULL) ? wserver->server->buffer_path : "", cond->str); @@ -1022,7 +1022,7 @@ int siridb_server_cexpr_cb( return cexpr_int_cmp( cond->operator, (wserver->siridb->server == wserver->server) ? - wserver->siridb->buffer_size : + wserver->siridb->buffer->size : wserver->server->buffer_size, cond->int64); diff --git a/src/siri/db/servers.c b/src/siri/db/servers.c index 2342e571..9c6aea60 100644 --- a/src/siri/db/servers.c +++ b/src/siri/db/servers.c @@ -574,7 +574,7 @@ int siridb_servers_list(siridb_server_t * server, uv_async_t * handle) qp_add_string( query->packer, (siridb->server == server) ? - siridb->buffer_path : + siridb->buffer->path : (server->buffer_path != NULL) ? server->buffer_path : ""); break; @@ -582,7 +582,7 @@ int siridb_servers_list(siridb_server_t * server, uv_async_t * handle) qp_add_int64( query->packer, (siridb->server == server) ? - siridb->buffer_size : server->buffer_size); + siridb->buffer->size : server->buffer_size); break; case CLERI_GID_K_DBPATH: qp_add_string( -- 2.30.2